今天是我們系列的最後一天!我們要為量化交易系統加上 Telegram 通知功能,這就像為農場安裝現代化的通訊系統一樣。不管我身在何處,都能即時收到農場的重要訊息 - 交易執行、系統狀態、盈虧變化等等!
# src/notifications/telegram_bot.py
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import aiohttp
import json
from dataclasses import dataclass
@dataclass
class TelegramConfig:
bot_token: str
chat_id: str
rate_limit: int = 30 # 每分鐘最多30條訊息
retry_attempts: int = 3
retry_delay: int = 5
class TelegramBot:
"""Telegram 機器人通知系統"""
def __init__(self, config: TelegramConfig):
self.config = config
self.base_url = f"https://api.telegram.org/bot{config.bot_token}"
self.session = None
self.message_queue = asyncio.Queue()
self.rate_limiter = RateLimiter(config.rate_limit)
self.logger = logging.getLogger(__name__)
async def start(self):
"""啟動 Telegram Bot"""
self.session = aiohttp.ClientSession()
# 驗證 Bot Token
bot_info = await self.get_bot_info()
if bot_info:
self.logger.info(f"Telegram Bot started: {bot_info['username']}")
# 啟動訊息處理任務
asyncio.create_task(self._message_processor())
else:
raise Exception("Failed to start Telegram Bot")
async def get_bot_info(self):
"""獲取 Bot 資訊"""
try:
async with self.session.get(f"{self.base_url}/getMe") as response:
if response.status == 200:
data = await response.json()
return data['result']
else:
self.logger.error(f"Failed to get bot info: {response.status}")
return None
except Exception as e:
self.logger.error(f"Error getting bot info: {e}")
return None
async def send_message(self, message: str, parse_mode: str = "HTML",
disable_web_page_preview: bool = True,
priority: str = "normal"):
"""發送訊息到隊列"""
message_data = {
'text': message,
'parse_mode': parse_mode,
'disable_web_page_preview': disable_web_page_preview,
'priority': priority,
'timestamp': datetime.now()
}
await self.message_queue.put(message_data)
async def _message_processor(self):
"""訊息處理器"""
while True:
try:
# 等待訊息
message_data = await self.message_queue.get()
# 檢查速率限制
await self.rate_limiter.wait()
# 發送訊息
await self._send_telegram_message(message_data)
# 標記任務完成
self.message_queue.task_done()
except Exception as e:
self.logger.error(f"Error in message processor: {e}")
await asyncio.sleep(1)
async def _send_telegram_message(self, message_data: Dict):
"""實際發送 Telegram 訊息"""
url = f"{self.base_url}/sendMessage"
payload = {
'chat_id': self.config.chat_id,
'text': message_data['text'],
'parse_mode': message_data['parse_mode'],
'disable_web_page_preview': message_data['disable_web_page_preview']
}
for attempt in range(self.config.retry_attempts):
try:
async with self.session.post(url, json=payload) as response:
if response.status == 200:
self.logger.debug("Message sent successfully")
return
else:
error_text = await response.text()
self.logger.warning(f"Failed to send message: {response.status} - {error_text}")
except Exception as e:
self.logger.error(f"Error sending message (attempt {attempt + 1}): {e}")
if attempt < self.config.retry_attempts - 1:
await asyncio.sleep(self.config.retry_delay)
async def stop(self):
"""停止 Bot"""
if self.session:
await self.session.close()
class RateLimiter:
"""速率限制器"""
def __init__(self, max_messages_per_minute: int):
self.max_messages = max_messages_per_minute
self.message_times = []
async def wait(self):
"""等待速率限制"""
now = datetime.now()
# 清理超過一分鐘的記錄
self.message_times = [
msg_time for msg_time in self.message_times
if now - msg_time < timedelta(minutes=1)
]
# 如果超過限制,等待
if len(self.message_times) >= self.max_messages:
wait_time = 60 - (now - self.message_times[0]).total_seconds()
if wait_time > 0:
await asyncio.sleep(wait_time)
# 記錄這次發送時間
self.message_times.append(now)
# src/notifications/message_templates.py
from datetime import datetime
from typing import Dict, Any
import emoji
class MessageTemplates:
"""訊息模板管理"""
@staticmethod
def trading_signal(signal_data: Dict) -> str:
"""交易信號通知"""
symbol = signal_data.get('symbol', 'Unknown')
action = signal_data.get('action', 'Unknown')
price = signal_data.get('price', 0)
size = signal_data.get('size', 0)
strategy = signal_data.get('strategy', 'Unknown')
action_emoji = {
'buy': '🟢',
'sell': '🔴',
'close': '⚫'
}.get(action.lower(), '🔵')
return f"""
{action_emoji} <b>交易信號</b>
📊 <b>標的:</b>{symbol}
🎯 <b>動作:</b>{action.upper()}
💰 <b>價格:</b>${price:,.2f}
📏 <b>數量:</b>{size}
🤖 <b>策略:</b>{strategy}
🕒 <b>時間:</b>{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
@staticmethod
def order_executed(order_data: Dict) -> str:
"""訂單執行通知"""
symbol = order_data.get('symbol', 'Unknown')
side = order_data.get('side', 'Unknown')
size = order_data.get('size', 0)
price = order_data.get('price', 0)
order_id = order_data.get('order_id', 'Unknown')
side_emoji = '🟢' if side.lower() == 'buy' else '🔴'
return f"""
{side_emoji} <b>訂單執行成功</b>
📋 <b>訂單ID:</b>{order_id}
📊 <b>標的:</b>{symbol}
📈 <b>方向:</b>{side.upper()}
💰 <b>價格:</b>${price:,.2f}
📏 <b>數量:</b>{size}
💵 <b>總值:</b>${price * size:,.2f}
🕒 <b>時間:</b>{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
@staticmethod
def pnl_update(pnl_data: Dict) -> str:
"""損益更新通知"""
total_pnl = pnl_data.get('total_pnl', 0)
daily_pnl = pnl_data.get('daily_pnl', 0)
unrealized_pnl = pnl_data.get('unrealized_pnl', 0)
portfolio_value = pnl_data.get('portfolio_value', 0)
pnl_emoji = '📈' if total_pnl >= 0 else '📉'
daily_emoji = '🟢' if daily_pnl >= 0 else '🔴'
return f"""
{pnl_emoji} <b>損益報告</b>
💰 <b>投資組合總值:</b>${portfolio_value:,.2f}
📊 <b>總損益:</b>${total_pnl:,.2f}
{daily_emoji} <b>今日損益:</b>${daily_pnl:,.2f}
⏳ <b>未實現損益:</b>${unrealized_pnl:,.2f}
📅 <b>更新時間:</b>{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
@staticmethod
def risk_alert(alert_data: Dict) -> str:
"""風險警告通知"""
alert_type = alert_data.get('type', 'Unknown')
message = alert_data.get('message', 'Unknown risk detected')
severity = alert_data.get('severity', 'medium')
severity_emoji = {
'low': '🟡',
'medium': '🟠',
'high': '🔴',
'critical': '🚨'
}.get(severity, '⚠️')
return f"""
{severity_emoji} <b>風險警告</b>
⚠️ <b>類型:</b>{alert_type}
📝 <b>描述:</b>{message}
🎚️ <b>嚴重程度:</b>{severity.upper()}
🕒 <b>時間:</b>{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
請立即檢查系統狀況!
"""
@staticmethod
def system_status(status_data: Dict) -> str:
"""系統狀態通知"""
status = status_data.get('status', 'unknown')
components = status_data.get('components', {})
uptime = status_data.get('uptime', 'Unknown')
status_emoji = {
'healthy': '🟢',
'warning': '🟡',
'error': '🔴',
'offline': '⚫'
}.get(status, '❓')
component_status = '\n'.join([
f" • {name}: {'✅' if health else '❌'}"
for name, health in components.items()
])
return f"""
{status_emoji} <b>系統狀態報告</b>
🖥️ <b>總體狀態:</b>{status.upper()}
⏱️ <b>運行時間:</b>{uptime}
<b>組件狀態:</b>
{component_status}
🕒 <b>檢查時間:</b>{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
@staticmethod
def daily_summary(summary_data: Dict) -> str:
"""每日總結報告"""
trades_count = summary_data.get('trades_count', 0)
total_volume = summary_data.get('total_volume', 0)
pnl = summary_data.get('pnl', 0)
win_rate = summary_data.get('win_rate', 0)
top_performer = summary_data.get('top_performer', 'N/A')
pnl_emoji = '📈' if pnl >= 0 else '📉'
return f"""
📊 <b>每日交易總結</b>
🔢 <b>交易次數:</b>{trades_count}
💵 <b>交易量:</b>${total_volume:,.2f}
{pnl_emoji} <b>淨損益:</b>${pnl:,.2f}
🎯 <b>勝率:</b>{win_rate:.1%}
🏆 <b>最佳標的:</b>{top_performer}
📅 <b>日期:</b>{datetime.now().strftime('%Y-%m-%d')}
"""
# src/notifications/notification_manager.py
import asyncio
import logging
from typing import Dict, List, Optional
from enum import Enum
class NotificationLevel(Enum):
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
class NotificationManager:
"""統一通知管理器"""
def __init__(self, telegram_bot: TelegramBot):
self.telegram_bot = telegram_bot
self.templates = MessageTemplates()
self.logger = logging.getLogger(__name__)
# 通知設定
self.notification_settings = {
NotificationLevel.INFO: True,
NotificationLevel.WARNING: True,
NotificationLevel.ERROR: True,
NotificationLevel.CRITICAL: True
}
# 重複通知防護
self.recent_notifications = {}
self.duplicate_threshold = 300 # 5分鐘內相同通知視為重複
async def send_trading_signal(self, signal_data: Dict):
"""發送交易信號通知"""
message = self.templates.trading_signal(signal_data)
await self._send_notification(message, NotificationLevel.INFO)
async def send_order_executed(self, order_data: Dict):
"""發送訂單執行通知"""
message = self.templates.order_executed(order_data)
await self._send_notification(message, NotificationLevel.INFO)
async def send_pnl_update(self, pnl_data: Dict):
"""發送損益更新通知"""
message = self.templates.pnl_update(pnl_data)
await self._send_notification(message, NotificationLevel.INFO)
async def send_risk_alert(self, alert_data: Dict):
"""發送風險警告"""
severity = alert_data.get('severity', 'medium')
level = {
'low': NotificationLevel.INFO,
'medium': NotificationLevel.WARNING,
'high': NotificationLevel.ERROR,
'critical': NotificationLevel.CRITICAL
}.get(severity, NotificationLevel.WARNING)
message = self.templates.risk_alert(alert_data)
await self._send_notification(message, level, allow_duplicate=False)
async def send_system_status(self, status_data: Dict):
"""發送系統狀態通知"""
message = self.templates.system_status(status_data)
await self._send_notification(message, NotificationLevel.INFO)
async def send_daily_summary(self, summary_data: Dict):
"""發送每日總結"""
message = self.templates.daily_summary(summary_data)
await self._send_notification(message, NotificationLevel.INFO)
async def send_custom_message(self, message: str, level: NotificationLevel = NotificationLevel.INFO):
"""發送自訂訊息"""
await self._send_notification(message, level)
async def _send_notification(self, message: str, level: NotificationLevel,
allow_duplicate: bool = True):
"""內部通知發送方法"""
# 檢查通知等級是否啟用
if not self.notification_settings.get(level, True):
return
# 檢查重複通知
if not allow_duplicate and self._is_duplicate_notification(message):
self.logger.debug("Duplicate notification blocked")
return
# 設定優先級
priority = {
NotificationLevel.INFO: "normal",
NotificationLevel.WARNING: "high",
NotificationLevel.ERROR: "high",
NotificationLevel.CRITICAL: "urgent"
}.get(level, "normal")
try:
await self.telegram_bot.send_message(message, priority=priority)
self.logger.info(f"Notification sent: {level.value}")
# 記錄通知歷史
self._record_notification(message)
except Exception as e:
self.logger.error(f"Failed to send notification: {e}")
def _is_duplicate_notification(self, message: str) -> bool:
"""檢查是否為重複通知"""
message_hash = hash(message)
now = datetime.now()
if message_hash in self.recent_notifications:
last_sent = self.recent_notifications[message_hash]
if (now - last_sent).total_seconds() < self.duplicate_threshold:
return True
return False
def _record_notification(self, message: str):
"""記錄通知歷史"""
message_hash = hash(message)
self.recent_notifications[message_hash] = datetime.now()
# 清理舊記錄
cutoff_time = datetime.now() - timedelta(seconds=self.duplicate_threshold * 2)
self.recent_notifications = {
msg_hash: timestamp
for msg_hash, timestamp in self.recent_notifications.items()
if timestamp > cutoff_time
}
def set_notification_level(self, level: NotificationLevel, enabled: bool):
"""設定通知等級開關"""
self.notification_settings[level] = enabled
self.logger.info(f"Notification level {level.value} {'enabled' if enabled else 'disabled'}")
# src/trading_engine/core.py (更新版本)
class TradingEngine:
def __init__(self, config: Config):
# ... 其他初始化程式碼 ...
# 初始化通知系統
telegram_config = TelegramConfig(
bot_token=config.telegram.bot_token,
chat_id=config.telegram.chat_id
)
self.telegram_bot = TelegramBot(telegram_config)
self.notification_manager = NotificationManager(self.telegram_bot)
async def start(self):
"""啟動交易引擎"""
try:
# 啟動通知系統
await self.telegram_bot.start()
# 發送系統啟動通知
await self.notification_manager.send_custom_message(
"🚀 <b>交易系統啟動</b>\n\n系統已成功啟動,開始監控市場...",
NotificationLevel.INFO
)
# ... 其他啟動程式碼 ...
except Exception as e:
await self.notification_manager.send_custom_message(
f"❌ <b>系統啟動失敗</b>\n\n錯誤: {str(e)}",
NotificationLevel.CRITICAL
)
raise
async def execute_trade(self, signal):
"""執行交易並發送通知"""
try:
# 執行交易
order_result = await self.order_manager.execute_order(signal)
if order_result.success:
# 發送成功通知
await self.notification_manager.send_order_executed({
'symbol': signal.symbol,
'side': signal.side,
'size': signal.size,
'price': order_result.price,
'order_id': order_result.order_id
})
else:
# 發送失敗通知
await self.notification_manager.send_custom_message(
f"⚠️ <b>交易執行失敗</b>\n\n標的: {signal.symbol}\n錯誤: {order_result.error}",
NotificationLevel.ERROR
)
except Exception as e:
await self.notification_manager.send_custom_message(
f"❌ <b>交易執行異常</b>\n\n{str(e)}",
NotificationLevel.CRITICAL
)
# src/tasks/reporting.py
import asyncio
from datetime import datetime, time
class ReportingTasks:
"""定期報告任務"""
def __init__(self, notification_manager: NotificationManager,
portfolio_manager, trading_engine):
self.notification_manager = notification_manager
self.portfolio_manager = portfolio_manager
self.trading_engine = trading_engine
async def start_scheduled_reports(self):
"""啟動定時報告"""
# 每日總結報告 (每天晚上23:00)
asyncio.create_task(self._daily_summary_task())
# 每小時損益報告
asyncio.create_task(self._hourly_pnl_task())
# 系統健康檢查 (每30分鐘)
asyncio.create_task(self._health_check_task())
async def _daily_summary_task(self):
"""每日總結任務"""
while True:
now = datetime.now()
# 計算到晚上23:00的秒數
target_time = now.replace(hour=23, minute=0, second=0, microsecond=0)
if target_time <= now:
target_time = target_time.replace(day=target_time.day + 1)
sleep_seconds = (target_time - now).total_seconds()
await asyncio.sleep(sleep_seconds)
# 生成每日總結
summary_data = await self.portfolio_manager.get_daily_summary()
await self.notification_manager.send_daily_summary(summary_data)
async def _hourly_pnl_task(self):
"""每小時損益報告"""
while True:
await asyncio.sleep(3600) # 1小時
pnl_data = await self.portfolio_manager.get_pnl_summary()
await self.notification_manager.send_pnl_update(pnl_data)
async def _health_check_task(self):
"""系統健康檢查"""
while True:
await asyncio.sleep(1800) # 30分鐘
health_status = await self.trading_engine.get_system_health()
await self.notification_manager.send_system_status(health_status)
今天我們完成了整個量化交易系統的最後一塊拼圖 - Telegram 通知系統,就像為農場安裝了現代化的通訊設備。
30天學習回顧:
Telegram 通知系統特色:
完整系統架構優勢:
從一個鄉下小孩的角度,我們成功建立了一個現代化的量化交易系統,就像把傳統農業轉變為智慧農業一樣。這個系統不僅能自動化執行交易策略,還能即時監控並通知重要事件。
學習成果:
這30天的學習旅程結束了,但真正的量化交易之路才剛開始。記住爸爸說過的話:「種田要有耐心,技術要持續學習」。量化交易也是如此,需要不斷學習、優化和適應市場變化。
願這個系列能幫助大家踏上成功的量化交易之路!🚀
系列完結 - 感謝大家30天的陪伴!